﻿#include <mutex>
#include <iostream>
#include <unordered_map>
#include <list>
#include <unordered_set>
#include <condition_variable>

#include "threadsafe.h"
#include "eventloop.h"

using namespace ThreadSafe;

struct WarpedEvent{
	ThreadSafe::Event* event;
	ThreadSafe::ThreadSafeBase* target;
};

typedef std::shared_ptr<WarpedEvent> WarpedEventPtr;
typedef std::weak_ptr<WarpedEvent> WarpedEventWPtr;

struct EventQueue{
    int useCount;
    std::list<WarpedEventWPtr> queue;
    std::mutex mutex;
    std::condition_variable wait;
};
typedef std::shared_ptr<EventQueue> EventQueuePtr;
typedef std::weak_ptr<EventQueue> EventQueueWPtr;

class EventLoopManager{
private:
    EventLoopManager()
    {
        MaxEventId = 1;
    }
    ~EventLoopManager()
    {

    }

public:
    static EventLoopManager* Instance()
    {
        static EventLoopManager i;
        return &i;
    }

    void PostEvent(ThreadSafeBase* receiver,Event* ev){
        EventQueuePtr q;
        MaxEventId ++;
        WarpedEventPtr e = std::make_shared<WarpedEvent>();
		e->event = ev;
		e->target = receiver;

        {
            std::lock_guard<std::mutex> lock(mutex);
			allEvent[receiver][ev] = e;//事件送入全局池
			auto id = objThread[receiver];
            if(!queues.count(id)){//该线程还未开启事件循环，先将事件放到临时队列
                tempEvent[id].push_back(e);
                return;
            }

            q = queues[id];
        }

        {
            std::lock_guard<std::mutex> l(q->mutex);
            q->queue.push_back(e);
            q->wait.notify_all();
        }
    }

    void CancelEvent(Event* id){
		std::lock_guard<std::mutex> lock(mutex);
		for (auto &i : allEvent) {
			if (i.second.count(id)) {
				delete i.second[id]->event;
				i.second.erase(id);
				return;
			}
		}
    }

	void CancelEvent(ThreadSafeBase* target, Event* id) {
		std::lock_guard<std::mutex> lock(mutex);
		if (allEvent.count(target)) {
			delete allEvent[target][id]->event;
			allEvent[target].erase(id);
		}
	}

    EventQueuePtr GetOrAcquireQueue(EventLoop::ThreadId id){
        std::lock_guard<std::mutex> lock(mutex);
        EventQueuePtr ret;

        if(!queues.count(id)){
            ret = std::make_shared<EventQueue>();
            ret->useCount = 1;
            if(tempEvent.count(id)){
                ret->queue = std::move(tempEvent[id]);
                tempEvent.erase(id);
            }
            queues[id] = ret;
        }
        else{
            ret = queues[id];
            ret->useCount ++;
        }

        return ret;
    }

    void ReleaseQueue(EventLoop::ThreadId id){
        std::lock_guard<std::mutex> lock(mutex);
        if(queues.count(id)){
            EventQueuePtr q = queues[id];
            std::lock_guard<std::mutex> lock2(q->mutex);
            q->useCount--;
            if(q->useCount == 0){
                queues.erase(id);
				for (auto &i : q->queue) {
					if (!i.expired()) {
						tempEvent[id].push_back(i);
					}
				}
            }
        }
    }

	void Register(ThreadSafeBase* obj) {
		ThreadId currentThread = std::this_thread::get_id();
		std::lock_guard<std::mutex> lock(mutex);
		objThread[obj] = currentThread;
	}

	void UnRegister(ThreadSafeBase* obj) {//对象销毁，将队列中的事件清除
		std::lock_guard<std::mutex> lock(mutex);
		objThread.erase(obj);
		allEvent.erase(obj);
	}

	ThreadId GetThread(ThreadSafeBase* obj) {
		std::lock_guard<std::mutex> lock(mutex);
		return objThread[obj];
	}

	void MoveToThread(ThreadSafeBase* obj, ThreadId newThread) {
		std::unordered_map<Event*, WarpedEventPtr> events;

		{
			std::lock_guard<std::mutex> lock(mutex);

			auto thread = objThread[obj];
			auto currentThread = std::this_thread::get_id();
			if (thread != currentThread)
			{
				std::cerr << "MoveToThread:该函数只能在对象所在的线程被执行" << std::endl;
				return;
			}

			events = std::move(allEvent[obj]);//将该线程对应的事件拿出
			allEvent.erase(obj);

			objThread[obj] = newThread;
		}

		for (auto &i : events) {
			PostEvent(i.second->target, i.second->event);//重新抛出事件
		}
	}

private:

	//当对象移动到另一线程时，将原线程中的事件摘除，放入到新线程的事件队列
	//当对象销毁时，将事件队列中所有和该对象相关的事件全部移除
	//当某线程的事件队列销毁时，事件队列中的事件会被转移到临时队列中
	//当某线程结束后，但是仍然有对象属于该线程，因为无法再在该线程上开启事件循环，因此对象的事件无法被执行。直到事件随着对象销毁而被撤销，或者对象被移入了一个新的线程中(qt中线程销毁后对象会被自动移入到主循环,这里做不到是因为不能主动监控到线程销毁事件)
	
    std::unordered_map<EventLoop::ThreadId,EventQueuePtr> queues;//当线程开启底部事件循环时，会创建一个队列并将缓存队列中的事件送入。

    std::unordered_map<EventLoop::ThreadId,std::list<WarpedEventWPtr>> tempEvent;//事件的缓存队列，当该线程没有正在开启的事件循环时，事件会送入该队列

	std::unordered_map<ThreadSafe::ThreadSafeBase*, ThreadId> objThread;//存储每个对象对应的线程

	std::unordered_map<ThreadSafeBase*, std::unordered_map<Event*, WarpedEventPtr>> allEvent;

    unsigned long long MaxEventId;

    std::mutex mutex;
};

struct EventLoop::Private{

    Private(){
        isActive = false;
        closeId = 0;
    }

    int closeId;
    bool isActive;
    EventQueuePtr queue;
    std::mutex mutex;

};

EventLoop::EventLoop():_P(new Private)
{

}

EventLoop::~EventLoop()
{
    exit();
}

int EventLoop::exec()
{
	ThreadId currentid = std::this_thread::get_id();
    {
        std::lock_guard<std::mutex> lock(_P->mutex);

        if(_P->isActive){
            std::cerr << "warnning EventLoop already started!" << std::endl;
            return -1;
        }

        _P->isActive = true;

        _P->queue = EventLoopManager::Instance()->GetOrAcquireQueue(currentid);
    }

    std::shared_ptr<int> clear(new int(0),[=](int *i){
        _P->queue.reset();
        delete i;
    });

    while (true) {
        if(!_P->isActive) break;

        WarpedEventWPtr eventw;
        {
            std::unique_lock<std::mutex> lock(_P->queue->mutex);
            while (_P->queue->queue.empty()) {
                _P->queue->wait.wait(lock);
                if(!_P->isActive) return _P->closeId;
            }

			eventw = _P->queue->queue.front();
            _P->queue->queue.pop_front();
        }
		
		auto event = eventw.lock();
		if (event) {
			bool ok = event->target->FilterEvent(event->event);
			if (!ok) {
				event->target->HandleEvent(event->event);
			}

			EventLoopManager::Instance()->CancelEvent(event->target,event->event);
		}
    }

	EventLoopManager::Instance()->ReleaseQueue(currentid);

    return _P->closeId;
}

void EventLoop::exit(int code)
{
    std::lock_guard<std::mutex> lock(_P->mutex);
    if(_P->isActive)
    {
        _P->isActive = false;
        _P->closeId = code;
        _P->queue->wait.notify_all();
    }
}

void EventLoop::PostEvent(ThreadSafe::ThreadSafeBase * receiver, ThreadSafe::Event * event)
{
	EventLoopManager::Instance()->PostEvent(receiver, event);
}

void EventLoop::CancelEvent(ThreadSafe::Event * event)
{
	EventLoopManager::Instance()->CancelEvent(event);
}

void EventLoop::Register(ThreadSafe::ThreadSafeBase * obj)
{
	EventLoopManager::Instance()->Register(obj);
}

void EventLoop::MoveThread(ThreadSafe::ThreadSafeBase* obj, ThreadId NewThread)
{
	EventLoopManager::Instance()->MoveToThread(obj, NewThread);
}

ThreadId EventLoop::GetThread(ThreadSafe::ThreadSafeBase * obj)
{
	return EventLoopManager::Instance()->GetThread(obj);
}

void EventLoop::UnRegister(ThreadSafe::ThreadSafeBase * obj)
{
	EventLoopManager::Instance()->UnRegister(obj);
}
